Source code for qtealeaves.observables.tnobservables

# This code is part of qtealeaves.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""
Observable to recap all the observables that can be measured by the TN
"""

import os
import re
import warnings
from collections import OrderedDict
from copy import deepcopy
from pathlib import Path

import h5py

from qtealeaves import __version__
from qtealeaves.tooling import QTeaLeavesError
from qtealeaves.tooling.parameterized import _ParameterizedClass

from .bond_entropy import TNObsBondEntropy
from .correlation import TNObsCorr, TNObsCorr4
from .custom_correlation import TNObsCustom
from .custom_function_obs import TNCustomFunctionObs
from .distance2pure import TNDistance2Pure
from .local import TNObsLocal
from .log_negativity import TNObsLogNegativity
from .mpo_densempolist import TNObsDenseMPOList
from .probabilities import TNObsProbabilities
from .projective import TNObsProjective
from .state2file import TNState2File
from .tensor_product import TNObsTensorProduct
from .timecorrelator import TNObsTZeroCorr
from .weighted_sum import TNObsWeightedSum

__all__ = ["TNObservables"]


[docs] class TNObservables(_ParameterizedClass): """ Organization of all the measurements to be taken during a tensor network simulation. To add new observables in the container you should simply use the :code:`+=` operator as shown in the example. **Example** .. code-block:: python obs = TNObservables() obs += any_other_observable **Arguments** filename_observables : str Base filename of the definition with the observables inside the input folder and observables subfolder. A postfix might be appended upon need. The file extension will be chosen by the backend. folder_observables : str Subfolder for the observable input files inside the input folder. num_trajectories : int Total number of quantum trajectories. **Details** Up to now, the class organizes and accepts observables of the type :class:`TNObsLocal`, :class:`TNObsCorr`, :class:`TNDistance2Pure`, :class:`TNState2File`, :class:`TNObsTensorProduct`, :class:`TNObsWeightedSum`, :class:`TNObsProjective`, :class:`TNObsProbabilities`, :class:`TNObsBondEntropy`, :class:`TNObsCustom`, :class:`TNObsDenseMPOList` """ def __init__( self, filename_observables="observables.in", folder_observables="observables", num_trajectories=1, do_write_hdf5=False, ): self.results_buffer = {} self.filename_observables = filename_observables self.folder_observables = folder_observables self.num_trajectories = num_trajectories self.do_write_hdf5 = do_write_hdf5 self.obs_list = OrderedDict() elems = [ TNObsLocal, TNObsCorr, TNDistance2Pure, TNState2File, TNObsTensorProduct, TNObsWeightedSum, TNObsProjective, TNObsProbabilities, TNObsBondEntropy, TNObsTZeroCorr, TNObsCorr4, TNObsCustom, TNObsDenseMPOList, TNObsLogNegativity, TNCustomFunctionObs, ] for elem in elems: obj = elem.empty() self.obs_list[repr(obj)] = obj
[docs] def get_num_trajectories(self, **kwargs): """ Get number of quantum trajectories **Arguments** params : keyword argument A dictionary with parameters is accepted as a keyword argument. """ params = kwargs.get("params", {}) num_trajectories = self.eval_numeric_param(self.num_trajectories, params) # the first entry of the seed list should be <4096 if num_trajectories > 4095: raise QTeaLeavesError("Number of trajectory over the limit for the seed.") return num_trajectories
[docs] def add_observable(self, obj): """ Add a specific observable to the measurements. The class of the observable added must match the predefined observables inside this class. """ if repr(obj) not in self.obs_list: raise QTeaLeavesError(f"Observables not of valid type: {repr(obj)}") self.obs_list[repr(obj)] += obj
def __iadd__(self, obj): """ Overwrite operator ``+=`` to simplify syntax. """ self.add_observable(obj) return self
[docs] @staticmethod def add_trajectories(all_results, new): """ Add the observables for different quantum trajectories. **Arguments** all_results : dict Dictionary with observables. new : dict Dictionary with new observables to add to all_results. """ for name in ["energy", "norm"]: if name not in all_results: all_results[name] = new[name] else: all_results[name] += new[name] # `time` is the time of the quench, not the CPU time # Must be the same for all simulations if dynamics # Do not add, just pick if it is present if ("time" not in all_results) and ("time" in new): all_results["time"] = new["time"] return all_results
[docs] @staticmethod def avg_trajectories(all_results, num_trajectories): """ Get the average of quantum trajectories observables. **Arguments** all_results : dict Dictionary with observables. num_trajectories : int Total number of quantum trajectories. """ # skip time, already not added up since it is the time # in the simulation (must be same). for name in ["energy", "norm"]: all_results[name] /= num_trajectories return all_results
[docs] def collect_operators(self): """ Collect all operators with some additional information. This function is just collecting the information from all observables stored within this class. """ op_lst = [] for elem in self.obs_list: for op_tuple in self.obs_list[elem].collect_operators(): op_lst.append(op_tuple) return list(set(op_lst))
[docs] def write_results(self, filename, params, state_ansatz): """ Write the complete measurement file mocking the fortran output. The assumption is that the results buffer of each measurement was set with all the required results. **Arguments** filename : str Target location of the file. params : dict Simulation parameters. state_ansatz : str Label identifying the state ansatz currently in use. """ # We probably need params in the future - avoid unused argument error _ = len(params) with open(filename, "w+") as fh: # First line is version fh.write("PY-" + str(__version__) + "\n") # Separator line fh.write("-" * 20 + "\n") # Energy and norm each in one line energy = self.results_buffer["energy"] if hasattr(energy, "__len__"): # Resolve list via last element energy = energy[-1] fh.write(str(energy) + "\n") fh.write(str(self.results_buffer["norm"]) + "\n") if "time" in self.results_buffer: fh.write(str(self.results_buffer["time"]) + "\n") else: fh.write(str(-999e300) + "\n") self.results_buffer = {} for elem in self.obs_list: self.obs_list[elem].write_results(fh, state_ansatz=state_ansatz)
[docs] @staticmethod def read_cpu_time_from_log(filename_result, params): """ Read the CPU time if it can be resolved via the log file. **Arguments** filename_result : str Filename to the output file including the path. This filename can vary for statics and dynamics and therefore has to be passed. Function assumes that the log-file is stored next to it. **Returns** Iterator returns key, value pair if available. """ regex = re.compile(r"CPU time:?\s*([\d\.]+)") log_file = Path(filename_result).parent / params.get("log_file", "sim.log") if not log_file.is_file(): return with log_file.open() as log_file: for line in reversed(log_file.readlines()): match = regex.search(line) if match: yield "cpu_time", float(match.group(1)) break
# pylint: disable-next=too-many-locals
[docs] def read_file(self, filename, params): """ Read all the results from a single file and store them in a dictionary. **Arguments** filename : str Filename to the output file including the path. This filename can vary for statics and dynamics and therefore has to be passed. params : dict The parameter dictionary, which is required to obtain the output folder. """ results = {} if self.eval_numeric_param(elem=self.do_write_hdf5, params=params): # read from the hdf5 file # filename is the same as for text, but with last three chars replaced by hdf5 hdf5_filename = filename[:-3] + "hdf5" with h5py.File(hdf5_filename, "r") as h5f: # The structure of the file is: # version # energy # time # norm # /TNObsLocal/0, 1, 2, ... # /TNObsCorr/0/real # /imag results["energy"] = h5f["energy"][()] results["time"] = h5f["time"][()] results["norm"] = h5f["norm"][()] for elem in self.obs_list: obs_type_name = elem obs_object = self.obs_list[elem] if type(obs_object) in [TNObsLocal, TNObsCorr]: # Currently only enabled for these two measurements # read if there is a dataset if obs_type_name in h5f: dataset = h5f[obs_type_name] for key, value in obs_object.read_hdf5( dataset, params=params ): results[key] = value else: warnings.warn( f"NOT IMPLEMENTED YET: Instance is {type(obs_object)}. " + "It does not support hdf5 read/writing yet. Use do_write_hdf5=False." ) else: # reading from .dat file with open(filename, "r") as fh: # First line is IO version (for future use) _ = fh.readline() # Separator _ = fh.readline() energy = fh.readline() results["energy"] = float(energy) norm = fh.readline() results["norm"] = float(norm) time = fh.readline() if "j" in time: # Detect complex number (complex time evolution) via j in string for imag part results["time"] = complex(time) elif float(time) > 0.0: results["time"] = float(time) for key, value in self.read_cpu_time_from_log(filename, params): results[key] = value for elem in self.obs_list: for key, value in self.obs_list[elem].read(fh, params=params): results[key] = value return results
[docs] def read(self, filename, folder, params): """ Read all the results and store them in a dictionary. **Arguments** filename : str Filename of the output file (not including the path). This filename can vary for statics and dynamics and therefore has to be passed. folder : str Folder of the output file. params : dict The parameter dictionary, which is required to obtain the output folder. """ num_trajectories = self.get_num_trajectories(params=params) if num_trajectories == 1: # no quantum trajectories folder_name_output = self.eval_str_param(folder, params) # get results # pylint: disable-next=no-member full_file_path = os.path.join(folder_name_output, filename) results = self.read_file(full_file_path, params) else: # read results for quantum trajectories results = {} for ii in range(num_trajectories): # add trajectory id into params tmp = deepcopy(params) tmp["trajectory_id"] = self.get_id_trajectories(ii) tmp["seed"] = self.get_seed_trajectories(tmp) folder_name_output = self.get_folder_trajectories(folder, tmp) # get results for each quantum trajectory # pylint: disable-next=no-member full_file_path = os.path.join(folder_name_output, filename) name_ii = ("trajectory", tmp["trajectory_id"]) results[name_ii] = self.read_file(full_file_path, params) results[name_ii]["seed"] = tmp["seed"] # add the observables results = self.add_trajectories(results, results[name_ii]) for elem in self.obs_list: results = self.obs_list[elem].add_trajectories( results, results[name_ii] ) # compute the average of the observables results = self.avg_trajectories(results, num_trajectories) for elem in self.obs_list: results = self.obs_list[elem].avg_trajectories( results, num_trajectories ) return results
[docs] @staticmethod def get_id_trajectories(val): """ Get the id for the given quantum trajectory. **Arguments** val : int value for id of the quantum trajectory. """ return val
[docs] @staticmethod def get_seed_trajectories(params): """ Get the seed for the given quantum trajectory. **Arguments** params : dict or list of dicts The parameter dictionary or dictionaries, which is required to obtain the output folder. **Returns** seed : list The seed for the simulation has 4 entries. The following rules applies: - entries must be smaller than 4096 - entries must be bigger than 0 - last entry must be odd """ if "seed" in params: raise QTeaLeavesError("The seed cannot be specified by the user.") seed = [params["trajectory_id"] + 1, 13, 17, 19] return seed
[docs] def get_folder_trajectories(self, folder_name, params): """ Evaluate the folder name and add the trajectory id if running trajectories. **Arguments** folder_name : str Name of the input/output folder for the simulation. params : dict or list of dicts The parameter dictionary or dictionaries, which is required to obtain the output folder. **Returns** folder_name : str Modified folder name with the trajectory id. """ num_trajectories = self.get_num_trajectories(params=params) folder_name = self.eval_str_param(folder_name, params) if num_trajectories != 1: folder_name = folder_name + "/qt_" + str(params["trajectory_id"]) return folder_name